{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# The Flint Time Series Library\n",
    "[Flint](https://github.com/twosigma/flint) is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark.\n",
    "Flint is [Two Sigma's](http://opensource.twosigma.com/) implementation of highly optimized time series operations in Spark.\n",
    "It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.\n",
    "\n",
    "Flint is an open source library for Spark based around the `TimeSeriesRDD`, a time series aware data structure, and a collection of time series utility and analysis functions that use `TimeSeriesRDD`s.\n",
    "Unlike `DataFrame` and `Dataset`, Flint's `TimeSeriesRDD`s can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties.\n",
    "It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data.\n",
    "\n",
    "This example uses `prices.csv` file from [Kaggle](https://www.kaggle.com/dgawlik/nyse). For it to work you need to get it and put it in `/tmp/prices.csv`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%classpath add mvn\n",
    "org.apache.spark spark-sql_2.12 2.4.4\n",
    "org.apache.spark spark-mllib_2.12 2.4.4\n",
    "com.github.twosigma flint 6055a7a231"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val begin = \"20150101\"\n",
    "val end   = \"20160101\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%spark -s"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import org.apache.spark.sql.types._\n",
    "import org.apache.spark.sql.Row\n",
    "//import com.twosigma.flint.timeseries.io.read\n",
    "import com.twosigma.flint.timeseries.Windows\n",
    "import com.twosigma.flint.timeseries.Summarizers\n",
    "import scala.concurrent.duration._\n",
    "//import com.twosigma.flint.timeseries.implicits._\n",
    "import com.twosigma.flint.timeseries._"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read Local Files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import com.twosigma.flint.timeseries.CSV\n",
    "\n",
    "//Load prices.csv from https://www.kaggle.com/dgawlik/nyse\n",
    "\n",
    "//Creates a TimeSeriesRDD from a CSV file\n",
    "var pricesRdd = CSV.from(\n",
    "  spark.sqlContext,\n",
    "  \"file:///tmp/prices.csv\",\n",
    "  header = true,\n",
    "  timeColumnName = \"date\",  \n",
    "  dateFormat = \"dd/MM/yyyy HH:mm\",\n",
    "  sorted = false\n",
    ")\n",
    "pricesRdd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pricesRdd.display(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Basic Operations"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val priceAsInteger = pricesRdd.cast(\"close\" -> IntegerType)\n",
    "preview(priceAsInteger)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val filteredRowsByPrice = pricesRdd.keepRows { row: Row => row.getAs[Double](\"low\") > 4.0 }\n",
    "preview(filteredRowsByPrice)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val timeColumnOnly = pricesRdd.keepColumns(\"time\")\n",
    "preview(timeColumnOnly)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val withoutIdColumn = pricesRdd.deleteColumns(\"symbol\")\n",
    "preview(withoutIdColumn)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val renamedColumns = pricesRdd.renameColumns(\"symbol\" -> \"ticker\", \"low\" -> \"lowPrice\", \"open\" -> \"openPrice\", \"close\" -> \"closePrice\", \"high\" -> \"highPrice\")\n",
    "preview(renamedColumns)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Basic arithmetic on each row"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Calculate logarithm of a column\n",
    "val logVolumeRdd = pricesRdd.addColumns(\"logVolume\" -> DoubleType -> { row => scala.math.log(row.getAs[Double](\"volume\")) })\n",
    "preview(pricesRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Raise a column to an exponent\n",
    "val squaredVolumeRdd = pricesRdd.addColumns(\"squaredVolume\" -> DoubleType -> { row => scala.math.pow(row.getAs[Double](\"volume\"), 2) })\n",
    "preview(squaredVolumeRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Calculate difference between two columns\n",
    "val priceChangeRdd = pricesRdd.addColumns(\"priceChange\" -> DoubleType -> { row => \n",
    "    row.getAs[Double](\"close\") - row.getAs[Double](\"open\")\n",
    "})\n",
    "preview(priceChangeRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "val pricePercentChange = pricesRdd.addColumns(\"pricePercentChange\" -> DoubleType -> { row =>\n",
    "    val openPrice = row.getAs[Double](\"open\")\n",
    "    val closePrice = row.getAs[Double](\"close\")\n",
    "    if (openPrice != 0) (closePrice - openPrice) / openPrice else null\n",
    "})\n",
    "preview(pricePercentChange)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Filtering"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Select rows where the price went up\n",
    "val priceIncreasedRdd = pricesRdd.keepRows { row =>\n",
    "    row.getAs[Double](\"close\") > row.getAs[Double](\"open\")\n",
    "}\n",
    "preview(priceIncreasedRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// The keepRows and deleteRows functions take a function from Row to Boolean as a filtering criteria.\n",
    "// Only get rows whose symbol starts with 'A'\n",
    "val startsWithARdd = pricesRdd.keepRows { row =>\n",
    "    val symbol = row.getAs[String](\"symbol\")\n",
    "    symbol != null && symbol.startsWith(\"A\")\n",
    "}\n",
    "preview(startsWithARdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "//Remove all rows whose volumn is less than 2000000\n",
    "val lowVolumeRdd  = pricesRdd.keepRows { row =>\n",
    "    row.getAs[Double](\"volume\") < 2000000\n",
    "}\n",
    "preview(lowVolumeRdd)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Using history with window"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Moving average over the last two weeks \n",
    "val ibmPricesRdd = pricesRdd.keepRows { row =>\n",
    "    row.getAs[String](\"symbol\") == \"IBM\"\n",
    "}\n",
    "var windowedIbmPricesRdd = ibmPricesRdd.addWindows(Windows.pastAbsoluteTime(\"14days\"))\n",
    "windowedIbmPricesRdd = windowedIbmPricesRdd.addColumns(\"movingAverage\" -> DoubleType -> { row =>\n",
    "    val pastRows = row.getAs[Seq[Row]](\"window_past_14days\")\n",
    "    pastRows.map(_.getAs[Double](\"close\")).sum / pastRows.size\n",
    "})\n",
    "preview(windowedIbmPricesRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Moving average over the last two weeks for all symbols \n",
    "var pastWindowRdd = pricesRdd.addWindows(Windows.pastAbsoluteTime(\"14days\"), Seq(\"symbol\"))\n",
    "pastWindowRdd = pastWindowRdd.addColumns(\"movingAverage\" -> DoubleType -> { row =>\n",
    "    val pastRows = row.getAs[Seq[Row]](\"window_past_14days\")\n",
    "    pastRows.map(_.getAs[Double](\"close\")).sum / pastRows.size\n",
    "})\n",
    "preview(pastWindowRdd)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Calculating values for a cycle"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// addColumnsForCycle takes a closure that is applied to a list of rows and returns a map from row to result. The list contains all rows that share a timestamp.\n",
    "\n",
    "// Add a column containing the number of instruments in the universe on each day\n",
    "val cycleRdd = pricesRdd.addColumnsForCycle(\"universeSize\" -> IntegerType -> { rows: Seq[Row] =>\n",
    "    rows.map { row => row -> rows.size }.toMap\n",
    "})\n",
    "preview(cycleRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Compute the Z score across an interval\n",
    "val zScoreRdd = pricesRdd.addColumnsForCycle(\"volumeZScore\" -> DoubleType -> { rows: Seq[Row] =>\n",
    "    val mean = rows.map(_.getAs[Double](\"volume\")).sum / rows.size\n",
    "    val stddev = scala.math.sqrt(rows.map { row =>\n",
    "        scala.math.pow(row.getAs[Double](\"close\") - mean, 2)\n",
    "    }.sum ) / (rows.size - 1)\n",
    "    rows.map { row =>\n",
    "        row -> (row.getAs[Double](\"close\") - mean) / stddev\n",
    "    }.toMap\n",
    "})\n",
    "preview(zScoreRdd)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Add a column with rankings with the same timestamp\n",
    "import org.apache.commons.math3.stat.ranking.NaturalRanking\n",
    "\n",
    "val rankedRdd = pricesRdd.addColumnsForCycle(\"r\" -> DoubleType -> { rows: Seq[Row] =>\n",
    "    val ranking = new NaturalRanking()\n",
    "    val ranks = ranking.rank(rows.map(_.getAs[Double](\"volume\")).toArray)\n",
    "    (rows zip ranks).toMap\n",
    "})\n",
    "preview(rankedRdd)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Intervalizing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Volume weighted average price for every 7 days for IBM\n",
    "val clock = Clocks.uniform(sc, \"7d\")\n",
    "var ibmPricesRdd = pricesRdd.keepRows { row =>\n",
    "    row.getAs[String](\"symbol\") == \"IBM\"\n",
    "}\n",
    "var volumeWeightedRdd = ibmPricesRdd.groupByInterval(clock).addColumns(\"volumeWeightedPrice\" -> DoubleType -> { row =>\n",
    "    val rows = row.getAs[Seq[Row]](\"rows\")\n",
    "    val weightedSum = rows.map { row =>\n",
    "        (row.getAs[Double](\"open\") + row.getAs[Double](\"close\")) / 2 * row.getAs[Double](\"volume\")\n",
    "    }.sum\n",
    "    weightedSum / rows.map (_.getAs[Double](\"volume\")).sum\n",
    "}).deleteColumns(\"rows\")\n",
    "preview(volumeWeightedRdd)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Aggregating"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "// Average daily volume\n",
    "val volumeRdd = pricesRdd.summarize(Summarizers.nthMoment(\"volume\", 1), Seq(\"symbol\"))\n",
    "preview(volumeRdd)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Regression with Open Source Package"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "//stat.regression\n",
    "import breeze.linalg.DenseVector\n",
    "import org.apache.spark.mllib.random.RandomRDDs\n",
    "import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint\n",
    "import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression\n",
    "\n",
    "// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0\n",
    "val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)\n",
    "\n",
    "// Fit the data using the OLS linear regression.\n",
    "val model = OLSMultipleLinearRegression.regression(data)\n",
    "\n",
    "// Retrieve the estimate beta and intercept.\n",
    "val denseVector = model.estimateRegressionParameters\n",
    "\n",
    "Map(denseVector.activeIterator.toSeq.map { m => m._1 -> m._2} : _*)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Scala",
   "language": "scala",
   "name": "scala"
  },
  "language_info": {
   "codemirror_mode": "text/x-scala",
   "file_extension": ".scala",
   "mimetype": "",
   "name": "Scala",
   "nbconverter_exporter": "",
   "version": "2.11.12"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": false,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {},
   "toc_section_display": false,
   "toc_window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
